package com.example.wxg.stream;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.table.sources.TableSource;

/**
 * @author void
 * @date 2021/11/12 15:38
 * @desc
 */
public class TableApiTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        
        //创建tableSource
        TableSource source = new CsvTableSource("D://data/abc.csv", new String[]{"name","age"}, new TypeInformation[]{Types.STRING, Types.INT});
        tableEnv.registerTableSource("CsvTable", source);
        
        Table table = tableEnv.scan("CsvTable");
        Table result = table.select("name,age");
        DataStream<Student> stream = tableEnv.toAppendStream(result, Student.class);
        stream.print().setParallelism(1);
        env.execute("执行");
    }
    
    public static class Student{
        public String name;
        public Integer age;

        public Student() {
        }

        public Student(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        @Override
        public String toString() {
            return "打印结果:Student{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    '}';
        }
    }
}
